Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix scan iter command issued to different replicas #3220

Open
wants to merge 44 commits into
base: master
Choose a base branch
from

Conversation

agnesnatasya
Copy link

@agnesnatasya agnesnatasya commented Apr 30, 2024

Pull Request check-list

  • Do tests and lints pass with this change?
    • lint passes
    • test passes on Ubuntu 22.04, python3.11.2
Starting Redis tests
= 2352 passed, 1235 skipped, 849 deselected, 29 xpassed, 347 warnings in 163.65s (0:02:43) =
Waiting for 6 cluster nodes to become available
All nodes are available!
= 1571 passed, 1494 skipped, 1396 deselected, 4 xpassed, 244 warnings in 728.56s (0:12:08) =
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)? all succeeded in my fork.
  • Is the new or changed code fully tested? added tests coverage
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)? bugfix only
    • I added some docstring to redis.asyncio.sentinel module. I wanted to check if the rST syntax is correct, but this module is not included in the builddir's index, so I think it's no-op.
  • Is there an example added to the examples folder (if applicable)? bugfix only
  • Was the change added to CHANGES file? bugfix only

fix scan iter command issued to different replicas

Fixes #3197. See linked issue for full description of the bug

@agnesnatasya
Copy link
Author

Some of the CI tests fails flakily with a segmentation fault - if i rerun the CI in my fork, it would work most of the time. But I don't have permission to rerun it in the main repository.
Do you know whether this is expected? @gerzse
Thank you!!

Copy link
Contributor

@gerzse gerzse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is a valid point. I left some comments in the changes, and I have one more general comment: the implementation covers now the async code. Did you leave out the sync code intentionally, or would it make sense to also adapt that one?

else:
# Check from the available connections, if any of the connection
# is connected to the host and port that we want
for available_connection in self._available_connections.copy():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are many connections? This linear search might slow down things.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I decided to create a new data structure for available connections for SentinelConnectionPool, called ConnectionsIndexer. I index the connection based on its address.
The other connection pool's data structure will still simply be a list

@@ -122,6 +147,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None
self._request_id_to_replica_address = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use a more general name, e.g. instead of request_id use context_id? Would it express better the fact that you are basically trying to run several commands in the same context?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion but I think request_id conveys a pretty good semantic. context sounds broad, and I'm afraid this will be extended for other 'context storing' purposes, although this really just holds for iter requests.
I can rename it self._iter_req_id_to_.... instead, but let me know if you prefer other names!

await self.release(connection)
raise
# Store the connection to the dictionary
self._request_id_to_replica_address[iter_req_id] = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would these entries be removed from the dict? So that it does not grow indefinitely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, I created a separate cleanup method that will be called at the end of scan iter family commands

@gerzse
Copy link
Contributor

gerzse commented Jun 14, 2024

Some of the CI tests fails flakily with a segmentation fault - if i rerun the CI in my fork, it would work most of the time. But I don't have permission to rerun it in the main repository. Do you know whether this is expected? @gerzse Thank you!!

Those segmentation faults have been there for a long time, to be honest I have no idea why they happen. They are so annoying. Eventually I'll spend some time trying to dig deeper.

Copy link
Author

@agnesnatasya agnesnatasya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing this @gerzse, appreciate it! Great points, I've addressed your comments. This is also a bug in the sync code, I've added the fix to the sync code as well!

@@ -122,6 +147,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None
self._request_id_to_replica_address = {}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion but I think request_id conveys a pretty good semantic. context sounds broad, and I'm afraid this will be extended for other 'context storing' purposes, although this really just holds for iter requests.
I can rename it self._iter_req_id_to_.... instead, but let me know if you prefer other names!

else:
# Check from the available connections, if any of the connection
# is connected to the host and port that we want
for available_connection in self._available_connections.copy():
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I decided to create a new data structure for available connections for SentinelConnectionPool, called ConnectionsIndexer. I index the connection based on its address.
The other connection pool's data structure will still simply be a list

await self.release(connection)
raise
# Store the connection to the dictionary
self._request_id_to_replica_address[iter_req_id] = (
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, I created a separate cleanup method that will be called at the end of scan iter family commands

Copy link
Author

@agnesnatasya agnesnatasya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took awhile for me to address your comments @gerzse, it had more changes than I expected, but this version should be ready for another round of review! Thank you very much!
I ran the CI workflow on my fork, it's passing except the tests that I believe is being fixed in #3324.

Comment on lines -1188 to -1204
# ensure this connection is connected to Redis
connection.connect()
# if client caching is not enabled connections that the pool
# provides should be ready to send a command.
# if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
# (if caching enabled the connection will not always be ready
# to send a command because it may contain invalidation messages)
try:
if connection.can_read() and connection.client_cache is None:
raise ConnectionError("Connection has data")
except (ConnectionError, OSError):
connection.disconnect()
connection.connect()
if connection.can_read():
raise ConnectionError("Connection not ready")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this to ensure_connection so that the subclass can just directly call this. There's also a similar block of code in BlockingConnectionPool, except that it doesn't have the and connection.client_cache is None clause. But it looks to me like there's nothing special with client_cache in BlockingConnectionPool
Should we refactor that out as well, or is the absence of that clause an intentional distinction for BlockingConnectionPool?

from redis.sentinel import Sentinel, SentinelConnectionPool, SentinelManagedConnection
from redis.utils import HIREDIS_AVAILABLE

pytestmark = pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked these test to ignore Hiredis. I'm not very familiar with Hiredis setup, but it looks like the hiredis' code paths doesn't really work well when we call connection-related operations like can_read (and as far as I understand it is unrelated to my change). Let me know if i need to not skip Hiredis

@agnesnatasya agnesnatasya requested a review from gerzse July 20, 2024 10:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

scan_iter family commands gives inconsistent result when using Sentinel connection pool
2 participants